Add commit retry and concurrency validation for writes#3320
Add commit retry and concurrency validation for writes#3320lawofcycles wants to merge 9 commits into
Conversation
Add automatic retry with exponential backoff when catalog commits fail due to concurrent transactions (CommitFailedException), and integrate the existing validation functions from validate.py into the write path to detect incompatible concurrent modifications (ValidationException). The retry loop is placed in Transaction.commit_transaction(). On each retry attempt, table metadata is refreshed, registered snapshot producers are re-executed to regenerate manifests, and data conflict validation is run. Uncommitted manifests from failed attempts are cleaned up after a successful commit. Validation is performed for _OverwriteFiles and _DeleteFiles based on the table's isolation level (serializable/snapshot). _FastAppendFiles and _MergeAppendFiles do not require validation since appends never conflict. Signed-off-by: Sotaro Hikita <[email protected]>
Skip _validate_no_new_delete_files and _validate_deleted_data_files when conflict_detection_filter is None, matching Java's BaseOverwriteFiles.validate() behavior for rowFilter == AlwaysFalse(). Route isolation level property based on the calling operation. Transaction.delete() uses write.delete.isolation-level (default). Transaction.overwrite(), dynamic_partition_overwrite(), and upsert() use write.update.isolation-level via _isolation_level_property on the snapshot producer. Remove unused WRITE_MERGE_ISOLATION_LEVEL constant.` Signed-off-by: Sotaro Hikita <[email protected]>
Use Operation enum instead of string literals for producer construction. Use .value for IsolationLevel string comparison to avoid unreachable statement warning. Signed-off-by: Sotaro Hikita <[email protected]>
Signed-off-by: Sotaro Hikita <[email protected]>
Fix _build_delete_files_partition_predicate overwriting _case_sensitive to True by passing the current value to delete_by_predicate. This caused case-insensitive deletes to fail when _OverwriteFiles was used with a user-specified predicate. Move import random/time to file top level. Add total timeout (commit.retry.total-timeout-ms) to the retry loop. Add comments for intentional validation duplication and cached_property clearing. Stabilize test_commit_retry_on_commit_failed by removing flaky patch.object assertion. Signed-off-by: Sotaro Hikita <[email protected]>
Signed-off-by: Sotaro Hikita <[email protected]>
Signed-off-by: Sotaro Hikita <[email protected]>
Signed-off-by: Sotaro Hikita <[email protected]>
In CI, pyiceberg.table module is loaded twice, creating two distinct Transaction class objects. patch.object on the test-imported Transaction does not affect the runtime Transaction used by Table.append(). Fix by resolving Transaction from pyiceberg.table module at runtime. Signed-off-by: Sotaro Hikita <[email protected]>
Benchmark resultsThis PR brings three capabilities to PyIceberg's write path.
To validate (3), I benchmarked concurrent appends using the NYC Yellow Taxi dataset (2024-01, 2.9M rows, 19 columns) with Glue Data Catalog + S3. Before vs AfterWithout this PR, concurrent appends fail immediately with
(N workers x 10 batches x 1K rows, Internal retry vs user-side retryCompared the internal retry (this PR) against a user-side retry that catches
(3 batches per worker, ~370K-1.5M rows per batch depending on worker count) Internal retry is faster because it reuses data files already written to S3 and only regenerates manifests on retry. User-side retry rewrites Parquet files on every attempt. Interestingly, internal retry actually performs more retries than user-side retry (88 vs 50 total retries at 8 workers), because the shorter retry window increases commit attempt density. Despite more retries, the total time is lower because each retry is much cheaper. Tuning
|
| min-wait-ms | Total time | Total retries |
|---|---|---|
| 100 | 158s | 78 |
| 500 | 126s | 115 |
| 1000 | 238s | 67 |
| 2000 | 235s | 63 |
| 3000 | 206s | 41 |
The default (100ms, matching Java Iceberg) works reasonably well, but 500ms is optimal for Glue. Too short causes contention storms, too long wastes time waiting. The optimal value depends on the catalog's commit latency.
qzyu999
left a comment
There was a problem hiding this comment.
Hi @lawofcycles, thanks so much for this amazing PR. I took a look and saw two spaces so far where there are some minor gaps that can be easily patched.
The first is regarding AssertTableUUID, where I notice a pattern of repetitively adding/removing it inside the retry loop for commit_transaction(). I believe this can be resolved simply by moving the addition part outside the for-loop.
The second is also regarding commit_transaction(), where in the case of an abort (e.g., ValidationException), there will be some orphaned manifest files. This can be easily fixed by adding a try/except around the for-loop itself, making sure upon failure that both _uncommitted_manifests and _written_manifests are cleared.
Thanks again for the great work, I look forward to #3320 merging so that I may integrate the changes into #3131, PTAL!
| for attempt in range(num_retries + 1): | ||
| try: | ||
| self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),) |
There was a problem hiding this comment.
suggestion: Here AssertTableUUID is appended to self._requirements within each retry loop, but below in _rebuild_snapshot_updates it's removed again with:
self._requirements = tuple(r for r in self._requirements if not isinstance(r, (AssertRefSnapshotId, AssertTableUUID)))
This can be simplified by moving self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),) outside the for-loop and updating the line in _rebuild_snapshot_updates to simply:
self._requirements = tuple(r for r in self._requirements if not isinstance(r, AssertRefSnapshotId))
The reason being is that AssertTableUUID would remain constant the whole time, so we're simply adding and removing it within each retry.
| def _cleanup_uncommitted(self) -> None: | ||
| """Delete manifest files from failed retry attempts.""" | ||
| for path in self._uncommitted_manifests: | ||
| try: | ||
| self._io.delete(path) | ||
| except Exception: | ||
| logger.warning("Failed to delete uncommitted manifest: %s", path, exc_info=True) | ||
| self._uncommitted_manifests.clear() |
There was a problem hiding this comment.
suggestion: We could also add a second similar function as follows:
def _clean_all_uncommitted(self) -> None:
"""Clean up all manifests written during this producer's lifecycle on abort."""
for path in itertools.chain(self._uncommitted_manifests, self._written_manifests):
try:
self._io.delete(path)
except Exception:
logger.warning("Failed to delete uncommitted manifest: %s", path, exc_info=True)
self._uncommitted_manifests.clear()
self._written_manifests.clear()then in Transaciton.commit_transaction(), we can add a try/except to the for-loop as follows:
try:
for attempt in range(num_retries + 1):
try:
self._table._do_commit(...)
self._cleanup_uncommitted_manifests()
break
except CommitFailedException:
... # retry logic
except Exception:
# Catch ValidationException or retry exhaustion
for producer in self._snapshot_producers:
producer._clean_all_uncommitted()
raisethis would then allow the PyIceberg implementation to mirror the cleanAll() method in Java. In the current implementation, the for-loop for retrying will only clear out the _uncommitted_manifests from the previous failed retries, but we can extend this with _clean_all_uncommitted which will clear out that and _written_manifests from the current attempt in the case of a permanent abort. This would fix the gap for orphaned manifests from ValidationException (or other permanent failures) that are not cleaned up. I also think it's worth mentioning that this fix could be cleanly added to this PR without waiting for a full Delete orphaned files implementation in PyIceberg. WDYT about adding this into the current PR?
Closes #3319
Closes #819
Closes #269
Rationale for this change
PyIceberg currently fails immediately with
CommitFailedExceptionwhen a concurrent transaction commits first, regardless of whether the writes actually conflict. Java Iceberg handles this transparently through its retry loop inSnapshotProducer.commit().This PR adds automatic commit retry with exponential backoff and data conflict validation to PyIceberg, matching Java Iceberg's behavior. On
CommitFailedException, the retry loop refreshes table metadata, re-runs validation, and regenerates manifests. If validation detects a real data conflict, the operation aborts withValidationExceptioninstead of retrying.The retry loop is placed in
Transaction.commit_transaction()rather than in individual snapshot producers. This is necessary becauseTransaction.delete()uses two producers (_DeleteFiles+_OverwriteFiles) that must be committed atomically. Retrying at the producer level would break this atomicity.Validation behavior follows Java's
BaseOverwriteFiles.validate(), using the existing validation functions fromvalidate.pythat were contributed through #1935, #1938, #2050, and #3049.Are these changes tested?
Yes. Unit tests and integration tests covering retry success,
ValidationExceptionabort, retry exhaustion, isolation levels, partition-level conflict detection, manifest cleanup, and producer state reset.Are there any user-facing changes?
Yes. Previously, all concurrent write conflicts resulted in
CommitFailedException.Now:
ValidationExceptioninstead ofCommitFailedExceptionThe following new table properties are supported.
commit.retry.num-retries(default: 4)commit.retry.min-wait-ms(default: 100)commit.retry.max-wait-ms(default: 60000)write.delete.isolation-level(default: serializable)write.update.isolation-level(default: serializable)